package com.example.hotitemanalysis;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;

public class GroupTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义
        DataStream<String> text = env.socketTextStream("localhost", 9090); //监听指定socket端口作为输入
        DataStream<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型

        DataStream<Tuple2<String, Integer>> wcount = ds
                .keyBy(0) //按照Tuple2<String, Integer>的第一个元素为key，也就是单词
                .window(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(20)))
                //key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
                .sum(1);// 将相同的key的元素第二个count值相加

        wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
                .process(new TopNFunction(5)).print("首字母分组TopN");
        env.execute("首字母分组TopN");
    }


    private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    private static class TupleKeySelectorByStart implements
            KeySelector<Tuple2<String, Integer>, String> {
        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            // TODO Auto-generated method stub
            return value.f0.substring(0, 1); //取首字母做key
        }

    }

    /**
     * 针对keyby window的TopN函数，继承自ProcessWindowFunction
     */
    private static class TopNFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
        private int topSize = 10;

        public TopNFunction(int topSize) {
            this.topSize = topSize;
        }

        @Override
        public void process(
                String arg0,
                ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context arg1,
                Iterable<Tuple2<String, Integer>> input,
                Collector<Tuple2<String, Integer>> out) throws Exception {

            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() {
                        @Override
                        public int compare(Integer y, Integer x) {
                            return (x < y) ? -1 : 1;
                        }
                    });

            for (Tuple2<String, Integer> element : input) {
                treemap.put(element.f1, element);
                if (treemap.size() > topSize) {
                    treemap.pollLastEntry();
                }
            }

            for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap
                    .entrySet()) {
                out.collect(entry.getValue());
            }

        }

    }
}
